{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "原文：<https://zhuanlan.zhihu.com/p/27258289>\n",
    "\n",
    "参考：[PEP 492PEP 525](https://www.python.org/dev/peps/pep-0492/#new-standard-library-functions)\n",
    "\n",
    "本文将会讲述Python 3.5之后出现的async/await的使用方法，以及它们的一些使用目的，如果错误，欢迎指正。\n",
    "\n",
    "Python在3.5版本中引入了关于协程的语法糖async和await，关于协程的概念可以先看我在[上一篇](https://zhuanlan.zhihu.com/p/25228075)文章提到的内容。\n",
    "\n",
    "# 常见函数形式\n",
    "\n",
    "\n",
    "看下Python中常见的几种函数形式：\n",
    "\n",
    "1. 普通函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "def function():\n",
    "    return 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. 生成器函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generator():\n",
    "     yield 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在3.5过后，我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。\n",
    "\n",
    "3. 异步函数（协程）"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def async_function():\n",
    "    return 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "4. 异步生成器"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def async_generator():\n",
    "    yield 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "通过类型判断可以验证函数的类型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "True\n",
      "True\n",
      "True\n",
      "True\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "D:\\SoftInstall\\Anacoda3\\lib\\site-packages\\ipykernel_launcher.py:5: RuntimeWarning: coroutine 'async_function' was never awaited\n",
      "  \"\"\"\n",
      "RuntimeWarning: Enable tracemalloc to get the object allocation traceback\n"
     ]
    }
   ],
   "source": [
    "import types\n",
    "\n",
    "print(type(function) is types.FunctionType)\n",
    "print(type(generator()) is types.GeneratorType)\n",
    "print(type(async_function()) is types.CoroutineType)\n",
    "print(type(async_generator()) is types.AsyncGeneratorType)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "直接调用异步函数不会返回结果，而是返回一个coroutine对象："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<coroutine object async_function at 0x000002184F42ED48>\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "D:\\SoftInstall\\Anacoda3\\lib\\site-packages\\ipykernel_launcher.py:1: RuntimeWarning: coroutine 'async_function' was never awaited\n",
      "  \"\"\"Entry point for launching an IPython kernel.\n",
      "RuntimeWarning: Enable tracemalloc to get the object allocation traceback\n"
     ]
    }
   ],
   "source": [
    "print(async_function())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "协程需要通过其他方式来驱动，因此可以使用这个协程对象的send方法给协程发送一个值："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "ename": "StopIteration",
     "evalue": "1",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mStopIteration\u001b[0m                             Traceback (most recent call last)",
      "\u001b[1;32m<ipython-input-7-3c970875d3e4>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mprint\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0masync_function\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mNone\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[1;31mStopIteration\u001b[0m: 1"
     ]
    }
   ],
   "source": [
    "print(async_function().send(None))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "不幸的是，如果通过上面的调用会抛出一个异常：\n",
    "\n",
    "`StopIteration: 1`\n",
    "\n",
    "因为生成器/协程在正常返回退出时会抛出一个`StopIteration`异常，而原来的返回值会存放在`StopIteration`对象的`value`属性中，通过以下捕获可以获取协程真正的返回值："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1\n"
     ]
    }
   ],
   "source": [
    "try:\n",
    "    async_function().send(None)\n",
    "except StopIteration as e:\n",
    "    print(e.value)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "通过上面的方式来新建一个run函数来驱动协程函数："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run(coroutine):\n",
    "    try:\n",
    "        coroutine.send(None)\n",
    "    except StopIteration as e:\n",
    "        return e.value"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 异步函数（协程）\n",
    "\n",
    "**在协程函数中，可以通过await语法来主动挂起自身的协程，并等待另一个协程完成直到返回结果：**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1\n"
     ]
    }
   ],
   "source": [
    "async def async_function():\n",
    "    return 1\n",
    "\n",
    "async def await_coroutine():\n",
    "    result = await async_function()\n",
    "    print(result)\n",
    "    \n",
    "run(await_coroutine())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "要注意的是，**await语法只能出现在通过async修饰的函数中，否则会报`SyntaxError`错误。**\n",
    "\n",
    "**而且await后面的对象需要是一个`Awaitable`，或者实现了相关的协议。**\n",
    "\n",
    "查看Awaitable抽象类的代码，表明了**只要一个类实现了`__await__`方法，那么通过它构造出来的实例就是一个`Awaitable`**：\n",
    "\n",
    "```py\n",
    "class Awaitable(metaclass=ABCMeta):\n",
    "    __slots__ = ()\n",
    "\n",
    "    @abstractmethod\n",
    "    def __await__(self):\n",
    "        yield\n",
    "\n",
    "    @classmethod\n",
    "    def __subclasshook__(cls, C):\n",
    "        if cls is Awaitable:\n",
    "            return _check_methods(C, \"__await__\")\n",
    "        return NotImplemented\n",
    "```\n",
    "\n",
    "而且可以看到，`Coroutine`类也继承了`Awaitable`，而且实现了`send`，`throw`和`close`方法。所以`await`一个使用异步的协程对象是合法的。\n",
    "\n",
    "```py\n",
    "class Coroutine(Awaitable):\n",
    "    __slots__ = ()\n",
    "\n",
    "    @abstractmethod\n",
    "    def send(self, value):\n",
    "        ...\n",
    "\n",
    "    @abstractmethod\n",
    "    def throw(self, typ, val=None, tb=None):\n",
    "        ...\n",
    "\n",
    "    def close(self):\n",
    "        ...\n",
    "        \n",
    "    @classmethod\n",
    "    def __subclasshook__(cls, C):\n",
    "        if cls is Coroutine:\n",
    "            return _check_methods(C, '__await__', 'send', 'throw', 'close')\n",
    "        return NotImplemented\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 异步生成器"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "接下来是异步生成器，来看一个例子：\n",
    "\n",
    "假如我要到一家超市去购买土豆，而超市货架上的土豆数量是有限的："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Potato:\n",
    "    @classmethod\n",
    "    def make(cls, num, *args, **kws):\n",
    "        potatos = []\n",
    "        for i in range(num):\n",
    "            potatos.append(cls.__new__(cls, *args, **kws))\n",
    "        return potatos\n",
    "\n",
    "all_potatos = Potato.make(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "现在我想要买50个土豆，每次从货架上拿走一个土豆放到篮子："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 示例代码，不要调用\n",
    "def take_potatos(num):\n",
    "    count = 0\n",
    "    while True:\n",
    "        if len(all_potatos) == 0:\n",
    "            sleep(.1)\n",
    "        else:\n",
    "            potato = all_potatos.pop()\n",
    "            yield potato\n",
    "            count += 1\n",
    "            if count == num:\n",
    "                break\n",
    "\n",
    "def buy_potatos():\n",
    "    bucket = []\n",
    "    for p in take_potatos(50):\n",
    "        bucket.append(p)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "对应到代码中，就是迭代一个生成器的模型，显然，当货架上的土豆不够的时候，这时只能够**死等**，而且在上面例子中等多长时间都不会有结果（因为一切都是同步的），也许可以用多进程和多线程解决，而在现实生活中，更应该像是这样的："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def take_potatos(num):\n",
    "    count = 0\n",
    "    while True:\n",
    "        if len(all_potatos) == 0:\n",
    "            await ask_for_potato()\n",
    "        potato = all_potatos.pop()\n",
    "        yield potato\n",
    "        count += 1\n",
    "        if count == num:\n",
    "            break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "当货架上的土豆没有了之后，我可以询问超市请求需要更多的土豆，这时候需要等待一段时间直到生产者完成生产的过程："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def ask_for_potato():\n",
    "    await asyncio.sleep(random.random())\n",
    "    all_potatos.extend(Potato.make(random.randint(1, 10)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "当生产者完成和返回之后，这是便能从await挂起的地方继续往下跑，完成消费的过程。而这整一个过程，就是一个异步生成器迭代的流程："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def buy_potatos():\n",
    "    bucket = []\n",
    "    async for p in take_potatos(50):\n",
    "        bucket.append(p)\n",
    "        print(f'Got potato {id(p)}...')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`async for`语法表示我们要后面迭代的是一个异步生成器。\n",
    "\n",
    "```py\n",
    "def main():\n",
    "    import asyncio\n",
    "    loop = asyncio.get_event_loop()\n",
    "    res = loop.run_until_complete(buy_potatos())\n",
    "    loop.close()\n",
    "```\n",
    "\n",
    "结果是这样的\n",
    "```\n",
    "Got potato 2283605158712...\n",
    "Got potato 2283605158656...\n",
    "Got potato 2283605157704...\n",
    "Got potato 2283604970072...\n",
    "Got potato 2283354273720...\n",
    "Got potato 2283606583616...\n",
    "Got potato 2283606583560...\n",
    "...\n",
    "```\n",
    "\n",
    "## 完整示例代码\n",
    "\n",
    "```py\n",
    "import asyncio\n",
    "import random\n",
    "\n",
    "\n",
    "class Potato:\n",
    "    @classmethod\n",
    "    def make(cls, num, *args, **kws):\n",
    "        potatoes = []\n",
    "        for i in range(num):\n",
    "            potatoes.append(cls.__new__(cls, *args, **kws))\n",
    "        return potatoes\n",
    "\n",
    "\n",
    "all_potatoes = Potato.make(5)\n",
    "\n",
    "\n",
    "async def take_potatoes(num):\n",
    "    count = 0\n",
    "    while True:\n",
    "        if len(all_potatoes) == 0:\n",
    "            await ask_for_potato()\n",
    "        yield all_potatoes.pop()\n",
    "        count += 1\n",
    "        if count == num:\n",
    "            break\n",
    "\n",
    "\n",
    "async def ask_for_potato():\n",
    "    await asyncio.sleep(random.random())\n",
    "    all_potatoes.extend(Potato.make(random.randint(1, 10)))\n",
    "\n",
    "\n",
    "async def buy_potatoes():\n",
    "    bucket = []\n",
    "    async for p in take_potatoes(50):\n",
    "        bucket.append(p)\n",
    "        print(f'Got potato {id(p)}...')\n",
    "\n",
    "\n",
    "def main():\n",
    "    loop = asyncio.get_event_loop()\n",
    "    loop.run_until_complete(buy_potatoes())\n",
    "    loop.close()\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    main()\n",
    "\n",
    "```\n",
    "\n",
    "## 多个异步任务\n",
    "\n",
    "既然是异步的，在请求之后不一定要死等，而是可以做其他事情。比如除了土豆，我还想买番茄，这时只需要在事件循环中再添加一个过程：\n",
    "```py\n",
    "import asyncio\n",
    "\n",
    "def main():\n",
    "    loop = asyncio.get_event_loop()\n",
    "    res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))\n",
    "    loop.close()\n",
    "```\n",
    "\n",
    "再来运行这段代码：\n",
    "\n",
    "```\n",
    "Got potato 4423119312...\n",
    "Got tomato 4423119368...\n",
    "Got potato 4429291024...\n",
    "Got potato 4421640768...\n",
    "Got tomato 4429331704...\n",
    "Got tomato 4429331760...\n",
    "Got tomato 4423119368...\n",
    "Got potato 4429331760...\n",
    "Got potato 4429331704...\n",
    "Got potato 4429346688...\n",
    "Got potato 4429346072...\n",
    "Got tomato 4429347360...\n",
    "...\n",
    "\n",
    "```\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 异步生成器的定义\n",
    "\n",
    "看下`AsyncGenerator`的定义，它需要实现`__aiter__`和`__anext__`两个核心方法，以及`asend`，`athrow`，`aclose`方法。\n",
    "\n",
    "```py\n",
    "class AsyncGenerator(AsyncIterator):\n",
    "    __slots__ = ()\n",
    "\n",
    "    async def __anext__(self):\n",
    "        ...\n",
    "\n",
    "    @abstractmethod\n",
    "    async def asend(self, value):\n",
    "        ...\n",
    "\n",
    "    @abstractmethod\n",
    "    async def athrow(self, typ, val=None, tb=None):\n",
    "        ...\n",
    "\n",
    "    async def aclose(self):\n",
    "        ...\n",
    "\n",
    "    @classmethod\n",
    "    def __subclasshook__(cls, C):\n",
    "        if cls is AsyncGenerator:\n",
    "            return _check_methods(C, '__aiter__', '__anext__',\n",
    "                                  'asend', 'athrow', 'aclose')\n",
    "        return NotImplemented\n",
    "```        "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 常见用法\n",
    "\n",
    "## 异步推导式\n",
    "\n",
    "异步生成器是在3.6之后才有的特性，同样的还有异步推导表达式，因此在上面的例子中，也可以写成这样：\n",
    "\n",
    "```py\n",
    "bucket = [p async for p in take_potatos(50)]\n",
    "```\n",
    "\n",
    "类似的，还有await表达式：\n",
    "\n",
    "```py\n",
    "result = [await fun() for fun in funcs if await condition()]\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 装饰类方法\n",
    "\n",
    "除了函数之外，类实例的普通方法也能用async语法修饰："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "\n",
    "\n",
    "class ThreeTwoOne:\n",
    "    async def begin(self):\n",
    "        print(3)\n",
    "        await asyncio.sleep(1)\n",
    "        print(2)\n",
    "        await asyncio.sleep(1)\n",
    "        print(1)        \n",
    "        await asyncio.sleep(1)\n",
    "        return\n",
    "\n",
    "async def game():\n",
    "    await ThreeTwoOne.begin()\n",
    "    print('start')\n",
    "    \n",
    "\n",
    "# loop = asyncio.get_event_loop()\n",
    "# loop.run_until_complete(game())\n",
    "# loop.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<function __main__.ThreeTwoOne.begin(self)>"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ThreeTwoOne.begin"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<bound method function of <class '__main__.ThreeTwoOne'>>"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "function.__get__(ThreeTwoOne, ThreeTwoOne())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "D:\\SoftInstall\\Anacoda3\\lib\\site-packages\\ipykernel_launcher.py:7: RuntimeWarning: coroutine 'ThreeTwoOne.begin' was never awaited\n",
      "  import sys\n",
      "RuntimeWarning: Enable tracemalloc to get the object allocation traceback\n"
     ]
    }
   ],
   "source": [
    "function = ThreeTwoOne.begin\n",
    "method = function.__get__(ThreeTwoOne, ThreeTwoOne())\n",
    "\n",
    "import inspect\n",
    "assert inspect.isfunction(function)\n",
    "assert inspect.ismethod(method)\n",
    "assert inspect.iscoroutine(method())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "同理还有类方法："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "class ThreeTwoOne:\n",
    "    @classmethod\n",
    "    async def begin(cls):\n",
    "        print(3)\n",
    "        await asyncio.sleep(1)\n",
    "        print(2)\n",
    "        await asyncio.sleep(1)\n",
    "        print(1)\n",
    "        await asyncio.sleep(1)\n",
    "        return\n",
    "\n",
    "\n",
    "async def game():\n",
    "    await ThreeTwoOne.begin()\n",
    "    print('start')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 上下文管理器\n",
    "\n",
    "根据PEP 492中，`async`也可以应用到上下文管理器中，`__aenter__`和`__aexit__`需要返回一个`Awaitable`："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "\n",
    "\n",
    "class GameContext:\n",
    "    async def __aenter__(self):\n",
    "        print('game loading...')\n",
    "        await asyncio.sleep(1)\n",
    "\n",
    "    async def __aexit__(self, exc_type, exc, tb):\n",
    "        print('game exit...')\n",
    "        await asyncio.sleep(1)\n",
    "\n",
    "\n",
    "async def game():\n",
    "    async with GameContext():\n",
    "        print('game start...')\n",
    "        await asyncio.sleep(2)\n",
    "\n",
    "\n",
    "# loop = asyncio.get_event_loop()\n",
    "# loop.run_until_complete(game())\n",
    "# loop.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在3.7版本，contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "from contextlib import asynccontextmanager\n",
    "\n",
    "\n",
    "@asynccontextmanager\n",
    "async def get_connection():\n",
    "    conn = await acquire_db_connection()\n",
    "    try:\n",
    "        yield\n",
    "    finally:\n",
    "        await release_db_connection(conn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`async`修饰符也能用在`__call__`方法上："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "from time import time\n",
    "\n",
    "\n",
    "class GameContext:\n",
    "\n",
    "    async def __aenter__(self):\n",
    "        self._started = time()\n",
    "        print('game loading...')\n",
    "        await asyncio.sleep(1)\n",
    "        return self\n",
    "\n",
    "    async def __aexit__(self, exc_type, exc, tb):\n",
    "        print('game exit...')\n",
    "        await asyncio.sleep(1)\n",
    "\n",
    "    async def __call__(self, *args, **kws):\n",
    "        if args[0] == 'time':\n",
    "            return time() - self._started\n",
    "\n",
    "\n",
    "async def game():\n",
    "    async with GameContext() as ctx:\n",
    "        print('game start...')\n",
    "        await asyncio.sleep(2)\n",
    "        print('game time: ', await ctx('time'))\n",
    "\n",
    "# loop = asyncio.get_event_loop()\n",
    "# loop.run_until_complete(game())\n",
    "# loop.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# await和yield from\n",
    "\n",
    "`await`和`yield from`\n",
    "\n",
    "Python3.3的`yield from`语法可以把生成器的操作委托给另一个生成器，生成器的调用方可以直接与子生成器进行通信："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1\n",
      "2\n",
      "3\n"
     ]
    }
   ],
   "source": [
    "def sub_gen():\n",
    "    yield 1\n",
    "    yield 2\n",
    "    yield 3\n",
    "\n",
    "def gen():\n",
    "    return (yield from sub_gen())\n",
    "\n",
    "def main():\n",
    "    for val in gen():\n",
    "        print(val)\n",
    "\n",
    "main()        "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "利用这一特性，使用`yield from`能够编写出类似协程效果的函数调用，在3.5之前，`asyncio`正是使用`@asyncio.coroutine`和`yield from`语法来创建协程"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# https://docs.python.org/3.4/library/asyncio-task.html\n",
    "import asyncio\n",
    "\n",
    "@asyncio.coroutine\n",
    "def compute(x, y):\n",
    "    print(\"Compute %s + %s ...\" % (x, y))\n",
    "    yield from asyncio.sleep(1.0)\n",
    "    return x + y\n",
    "\n",
    "@asyncio.coroutine\n",
    "def print_sum(x, y):\n",
    "    result = yield from compute(x, y)\n",
    "    print(\"%s + %s = %s\" % (x, y, result))\n",
    "\n",
    "# loop = asyncio.get_event_loop()\n",
    "# loop.run_until_complete(print_sum(1, 2))\n",
    "# loop.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**然而，用`yield from`容易在表示协程和生成器中混淆，没有良好的语义性**，所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。\n",
    "\n",
    "因此类似以下的调用是等价的：\n",
    "\n",
    "```py\n",
    "async with lock:\n",
    "    ...\n",
    "    \n",
    "with (yield from lock):\n",
    "    ...\n",
    "######################\n",
    "def main():\n",
    "    return (yield from coro())\n",
    "\n",
    "def main():\n",
    "    return (await coro())\n",
    "```    \n",
    "\n",
    "那么，怎么把生成器包装为一个协程对象呢？这时候可以用到`types`包中的`coroutine`装饰器（如果使用`asyncio`做驱动的话，那么也可以使用`asyncio.coroutine`装饰器），`@types.coroutine`装饰器会将一个生成器函数包装为协程对象："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import types\n",
    "\n",
    "@types.coroutine\n",
    "def compute(x, y):\n",
    "    print(\"Compute %s + %s ...\" % (x, y))\n",
    "    yield from asyncio.sleep(1.0)\n",
    "    return x + y\n",
    "\n",
    "async def print_sum(x, y):\n",
    "    result = await compute(x, y)\n",
    "    print(\"%s + %s = %s\" % (x, y, result))\n",
    "\n",
    "# loop = asyncio.get_event_loop()\n",
    "# loop.run_until_complete(print_sum(1, 2))\n",
    "# loop.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "尽管两个函数分别使用了新旧语法，但他们都是协程对象，也分别称作`native coroutine`以及`generator-based coroutine`，因此不用担心语法问题。\n",
    "\n",
    "下面观察一个asyncio中Future的例子："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "ename": "RuntimeError",
     "evalue": "This event loop is already running",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mRuntimeError\u001b[0m                              Traceback (most recent call last)",
      "\u001b[1;32m<ipython-input-27-7cca93a3c0c0>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m\u001b[0m\n\u001b[0;32m     13\u001b[0m loop.run_until_complete(asyncio.wait([\n\u001b[0;32m     14\u001b[0m     \u001b[0mcoro1\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 15\u001b[1;33m     \u001b[0mcoro2\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m     16\u001b[0m ]))\n\u001b[0;32m     17\u001b[0m \u001b[0mloop\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclose\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mD:\\SoftInstall\\Anacoda3\\lib\\asyncio\\base_events.py\u001b[0m in \u001b[0;36mrun_until_complete\u001b[1;34m(self, future)\u001b[0m\n\u001b[0;32m    569\u001b[0m         \u001b[0mfuture\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0madd_done_callback\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_run_until_complete_cb\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    570\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 571\u001b[1;33m             \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrun_forever\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    572\u001b[0m         \u001b[1;32mexcept\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    573\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[0mnew_task\u001b[0m \u001b[1;32mand\u001b[0m \u001b[0mfuture\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdone\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mand\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mfuture\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcancelled\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mD:\\SoftInstall\\Anacoda3\\lib\\asyncio\\base_events.py\u001b[0m in \u001b[0;36mrun_forever\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    524\u001b[0m         \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_check_closed\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    525\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mis_running\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 526\u001b[1;33m             \u001b[1;32mraise\u001b[0m \u001b[0mRuntimeError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'This event loop is already running'\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    527\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mevents\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_get_running_loop\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[1;32mNone\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    528\u001b[0m             raise RuntimeError(\n",
      "\u001b[1;31mRuntimeError\u001b[0m: This event loop is already running"
     ]
    }
   ],
   "source": [
    "import asyncio\n",
    "\n",
    "future = asyncio.Future()\n",
    "\n",
    "async def coro1():\n",
    "    await asyncio.sleep(1)\n",
    "    future.set_result('data')\n",
    "\n",
    "async def coro2():\n",
    "    print(await future)\n",
    "\n",
    "loop = asyncio.get_event_loop()\n",
    "loop.run_until_complete(asyncio.wait([\n",
    "    coro1(), \n",
    "    coro2()\n",
    "]))\n",
    "loop.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "两个协程在在事件循环中，协程coro1在执行第一句后挂起自身切到`asyncio.sleep`，而协程coro2一直等待future的结果，让出事件循环，计时器结束后coro1执行了第二句设置了future的值，被挂起的coro2恢复执行，打印出future的结果'data'。\n",
    "\n",
    "future可以被`await`证明了future对象是一个`Awaitable`，进入Future类的源码可以看到有一段代码显示了future实现了`__await__`协议：\n",
    "\n",
    "```py\n",
    "class Future:\n",
    "    ...\n",
    "    def __iter__(self):\n",
    "        if not self.done():\n",
    "            self._asyncio_future_blocking = True\n",
    "            yield self  # This tells Task to wait for completion.\n",
    "        assert self.done(), \"yield from wasn't used with future\"\n",
    "        return self.result()  # May raise too.\n",
    "\n",
    "    if compat.PY35:\n",
    "        __await__ = __iter__ # make compatible with 'await' expression\n",
    "```\n",
    "\n",
    "当执行await future这行代码时，future中的这段代码就会被执行，首先future检查它自身是否已经完成，如果没有完成，挂起自身，告知当前的Task（任务）等待future完成。\n",
    "\n",
    "当future执行set_result方法时，会触发以下的代码，设置结果，标记future已经完成：\n",
    "\n",
    "```py\n",
    "def set_result(self, result):\n",
    "    ...\n",
    "    if self._state != _PENDING:\n",
    "        raise InvalidStateError('{}: {!r}'.format(self._state, self))\n",
    "    self._result = result\n",
    "    self._state = _FINISHED\n",
    "    self._schedule_callbacks()\n",
    "```    \n",
    "\n",
    "最后future会调度自身的回调函数，触发Task._step()告知Task驱动future从之前挂起的点恢复执行，不难看出，future会执行下面的代码：\n",
    "\n",
    "```py\n",
    "class Future:\n",
    "    ...\n",
    "    def __iter__(self):\n",
    "        ...\n",
    "        assert self.done(), \"yield from wasn't used with future\"\n",
    "        return self.result()  # May raise too.\n",
    "```\n",
    "\n",
    "最终返回结果给调用方。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 其他协程库：curio\n",
    "\n",
    "前面讲了那么多关于`asyncio`的例子，那么除了`asyncio`，就没有其他协程库了吗？`asyncio`作为python的标准库，自然受到很多青睐，但它有时候还是显得太重量了，尤其是提供了许多复杂的轮子和协议，不便于使用。\n",
    "\n",
    "你可以理解为，`asyncio`是使用`async/await`语法开发的协程库，而不是有`asyncio`才能用`async/await`，除了`asyncio`之外，`curio`和`trio`是更加轻量级的替代物，而且也更容易使用。\n",
    "\n",
    "`curio`的作者是David Beazley，下面是使用`curio`创建Tcp server的例子，据说这是dabeaz理想中的一个异步服务器的样子："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from curio import run, spawn\n",
    "from curio.socket import *\n",
    "\n",
    "\n",
    "async def echo_server(address):\n",
    "    sock = socket(AF_INET, SOCK_STREAM)\n",
    "    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)\n",
    "    sock.bind(address)\n",
    "    sock.listen(5)\n",
    "    print('Server listening at', address)\n",
    "    async with sock:\n",
    "        while True:\n",
    "            client, addr = await sock.accept()\n",
    "            await spawn(echo_client, client, addr)\n",
    "\n",
    "\n",
    "async def echo_client(client, addr):\n",
    "    print('Connection from', addr)\n",
    "    async with client:\n",
    "        while True:\n",
    "            data = await client.recv(100000)\n",
    "            if not data:\n",
    "                break\n",
    "            await client.sendall(data)\n",
    "    print('Connection closed')\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    run(echo_server, ('', 25000))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "验证一下：\n",
    "```\n",
    "$ nc localhost 25000\n",
    "hello world\n",
    "hello world\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "无论是`asyncio`还是`curio`，或者是其他异步协程库，在背后往往都会借助于IO的事件循环来实现异步，下面用几十行代码来展示一个简陋的基于事件驱动的echo服务器："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR\n",
    "from selectors import DefaultSelector, EVENT_READ\n",
    "\n",
    "selector = DefaultSelector()\n",
    "pool = {}\n",
    "\n",
    "\n",
    "def request(client_socket, addr):\n",
    "    client_socket, addr = client_socket, addr\n",
    "\n",
    "    def handle_request(key, mask):\n",
    "        data = client_socket.recv(100000)\n",
    "        if not data:\n",
    "            client_socket.close()\n",
    "            selector.unregister(client_socket)\n",
    "            del pool[addr]\n",
    "        else:\n",
    "            client_socket.sendall(data)\n",
    "\n",
    "    return handle_request\n",
    "\n",
    "\n",
    "def recv_client(key, mask):\n",
    "    sock = key.fileobj\n",
    "    client_socket, addr = sock.accept()\n",
    "    req = request(client_socket, addr)\n",
    "    pool[addr] = req\n",
    "    selector.register(client_socket, EVENT_READ, req)\n",
    "\n",
    "\n",
    "def echo_server(address):\n",
    "    sock = socket(AF_INET, SOCK_STREAM)\n",
    "    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)\n",
    "    sock.bind(address)\n",
    "    sock.listen(5)\n",
    "    selector.register(sock, EVENT_READ, recv_client)\n",
    "    try:\n",
    "        while True:\n",
    "            events = selector.select()\n",
    "            for key, mask in events:\n",
    "                callback = key.data\n",
    "                callback(key, mask)\n",
    "    except KeyboardInterrupt:\n",
    "        sock.close()\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    echo_server(('', 25000))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "$ nc localhost 25000\n",
    "hello world\n",
    "hello world\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 总结"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- 完成异步的代码不一定要用`async/await`\n",
    "- 使用了`async/await`的代码也不一定能做到异步\n",
    "- `async/await`是协程的语法糖，使协程之间的调用变得更加清晰\n",
    "- 使用async修饰的函数调用时会返回一个协程对象\n",
    "- await只能放在async修饰的函数里面使用，await后面必须要跟着一个协程对象或Awaitable\n",
    "- await的目的是等待协程控制流的返回，而实现暂停并挂起函数的操作是yield。\n",
    "\n",
    "个人认为，`async/await`以及协程是Python未来实现异步编程的趋势，我们将会在更多的地方看到他们的身影，例如协程库的curio和trio，web框架的sanic，数据库驱动的asyncpg等等...在Python 3主导的今天，作为开发者，应该及时拥抱和适应新的变化，而基于`async/await`的协程凭借良好的可读性和易用性日渐登上舞台，看到这里，你还不赶紧上车？\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": true
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
